{
  "cells": [
    {
      "cell_type": "markdown",
      "id": "4336558d",
      "metadata": {
        "id": "4336558d"
      },
      "source": [
        "# Data Capture\n",
        "\n",
        "<a href=\"https://colab.research.google.com/github/lwa-project/bifrost/blob/master/tutorial/06_data_capture.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"></a>\n",
        "\n",
        "Next we will look at how to use Bifrost to work with packetized data, either from the network or from packets recorded to a file.  This is done through the `bifrost.packet_capture` module.\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "%%capture install_log\n",
        "# Import bifrost, but attempt to auto-install if needed (and we're running on\n",
        "# Colab). If something goes wrong, evaluate install_log.show() in a new block\n",
        "# to retrieve the details.\n",
        "try:\n",
        "  import bifrost\n",
        "except ModuleNotFoundError as exn:\n",
        "  try:\n",
        "    import google.colab\n",
        "  except ModuleNotFoundError:\n",
        "    raise exn\n",
        "  !sudo apt-get -qq install universal-ctags libopenblas-dev librdmacm-dev software-properties-common build-essential\n",
        "  !pip install -q contextlib2 pint simplejson scipy git+https://github.com/ctypesgen/ctypesgen.git\n",
        "  ![ -d ~/bifrost/.git ] || git clone https://github.com/lwa-project/bifrost ~/bifrost\n",
        "  !(cd ~/bifrost && ./configure --disable-hwloc && make -j all && sudo make install)\n",
        "  import bifrost"
      ],
      "metadata": {
        "id": "2w7kFngftyj2"
      },
      "id": "2w7kFngftyj2",
      "execution_count": 6,
      "outputs": []
    },
    {
      "cell_type": "code",
      "execution_count": 8,
      "id": "372e7e14",
      "metadata": {
        "id": "372e7e14"
      },
      "outputs": [],
      "source": [
        "import json\n",
        "import ctypes\n",
        "import threading\n",
        "\n",
        "from bifrost.address import Address\n",
        "from bifrost.udp_socket import UDPSocket\n",
        "from bifrost.packet_capture import PacketCaptureCallback, UDPCapture\n",
        "\n",
        "addr = Address('127.0.0.1', 10000)\n",
        "sock = UDPSocket()\n",
        "sock.bind(addr)\n",
        "sock.timeout = 5.0\n",
        "\n",
        "class CaptureOp(object):\n",
        "    def __init__(self, log, oring, sock, nsrc=16, src0=0, max_size=9000, ntime_gulp=250, ntime_slot=25000, core=-1):\n",
        "        self.log = log\n",
        "        self.oring = oring\n",
        "        self.sock = sock\n",
        "        self.nsrc = nsrc\n",
        "        self.src0 = src0\n",
        "        self.max_size = max_size\n",
        "        self.ntime_gulp = ntime_gulp\n",
        "        self.ntime_slot = ntime_slot\n",
        "        self.core = core\n",
        "        self.shutdown_event = threading.Event()\n",
        "\n",
        "    def shutdown(self):\n",
        "        self.shutdown_event.set()\n",
        "\n",
        "    def seq_callback(\n",
        "        self, seq0, chan0, nchan, nsrc, time_tag_ptr, hdr_ptr, hdr_size_ptr):\n",
        "        timestamp0 = int((self.utc_start - ADP_EPOCH).total_seconds())\n",
        "        time_tag0 = timestamp0 * int(FS)\n",
        "        time_tag = time_tag0 + seq0 * (int(FS) // int(CHAN_BW))\n",
        "        print(\"++++++++++++++++ seq0     =\", seq0)\n",
        "        print(\"                 time_tag =\", time_tag)\n",
        "        time_tag_ptr[0] = time_tag\n",
        "        hdr = {\n",
        "            \"time_tag\": time_tag,\n",
        "            \"seq0\": seq0,\n",
        "            \"chan0\": chan0,\n",
        "            \"nchan\": nchan,\n",
        "            \"cfreq\": (chan0 + 0.5 * (nchan - 1)) * CHAN_BW,\n",
        "            \"bw\": nchan * CHAN_BW,\n",
        "            \"nstand\": nsrc * 16,\n",
        "            \"npol\": 2,\n",
        "            \"complex\": True,\n",
        "            \"nbit\": 4,\n",
        "            \"axes\": \"time,chan,stand,pol\",\n",
        "        }\n",
        "        print(\"******** CFREQ:\", hdr[\"cfreq\"])\n",
        "        hdr_str = json.dumps(hdr)\n",
        "        self.header_buf = ctypes.create_string_buffer(hdr_str)\n",
        "        hdr_ptr[0] = ctypes.cast(self.header_buf, ctypes.c_void_p)\n",
        "        hdr_size_ptr[0] = len(hdr_str)\n",
        "        return 0\n",
        "\n",
        "    def main(self):\n",
        "        seq_callback = PacketCaptureCallback()\n",
        "        seq_callback.set_chips(self.seq_callback)\n",
        "        with UDPCapture('chips', self.sock, self.nsrc, self.src0, self.max_size,\n",
        "                        self.ntime_gulp, self.ntime_slot, sequence_callback=seq_callback,\n",
        "                        core=self.core) as capture:\n",
        "            while not self.shutdown_event.is_set():\n",
        "                status = capture.recv()\n",
        "        del capture"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "3048e76d",
      "metadata": {
        "id": "3048e76d"
      },
      "source": [
        "This block implements data capture of the [CHIPS format](https://github.com/jaycedowell/bifrost/blob/disk-readers/src/formats/chips.hpp#L36) from the network.  The snippet starts out by creating a socket that will be used to receive the data on using `bifrost.address.Address` and `bifrost.udp_socket.UDPSocket`.  The capture block looks similar to other blocks that we have looked at but there are a few things to note.\n",
        " 1. This block accepts many more keywords than the previous block.  These extra keywords are used to control the packet capture and data ordering when it is copied into the ring buffer.  They are:\n",
        "  * `nsrc` - The number of packet sources to expect data from,\n",
        "  * `src0` - The source ID number for the first packet socket,\n",
        "  * `max_size` - The maximum packet size to accept.  This is usually set to 9000 to allow for jumbo packets,\n",
        "  * `ntime_gulp` - This controls the internal buffer size used by the packet capture.  Bifrost keeps two buffers open and releases them to the output ring as data from new gulps is received.\n",
        "  * `ntime_slot` - The approximate number of packet sets (a packet from all `nsrc` sources) per second.  This is used by Bifrost to determine the boundaries in the gulps.\n",
        " 2. There is a an internal `threading.Event` instance that is used as a signal for telling the `CaptureOp` block to stop.  Without the capture would run indefinitely.\n",
        " 3. There is a `seq_callback` method that is called by Bifrost when the packet sequence changes.  This method accepts a format-specific number of arguments and returns a JSON-packed header that sent to the ring.\n",
        " 4. The `main` method implements the packet capture by calling a collection of Bifrost classes:\n",
        "  * First, a new `PacketCaptureCallback` instance is created and then the callback for the CHIPS format is set to `CaptureOp.seq_callback`.  This redies the method for Bifrost to use it when the sequence changes.\n",
        "  * Next, a new `UDPCapture` instance is created for the packet format with the relevant source/data parameters.  This is used as a context for this packet capture itself.\n",
        "  * Finally, `UDPCapture.recv` is called repeatedly to receive and process packets.  This method returns an integer after a gulp has been released to the ring.  This interger stores the current state of the capture.\n",
        "\n",
        "As mentioned before, Bifrost also works with reading packets from a file using the `bifrost.packet_capture.DiskReader` class.  This works similar to `UDPCapture` but the packet format specifiers require extra information in order to read packets from disk.  For example, a CHIPS capture of 132 channels is specified as \"chips\" for `UDPCapture` but as \"chips_132\" for `DiskReader`."
      ]
    },
    {
      "cell_type": "markdown",
      "id": "52efad41",
      "metadata": {
        "id": "52efad41"
      },
      "source": [
        "## Writing Data\n",
        "\n",
        "Related to this capture interface is the `bifrost.packet_writer` module.  This implments the reverse of the capture in that it takes data from a ring and write it to the network or disk.\n",
        "\n",
        "Let's look at an example of writing binary data in the [LWA TBN format](https://fornax.phys.unm.edu/lwa/trac/wiki/DP_Formats#TBNOutputInterface), a stream of 8+8-bit complex integers:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 9,
      "id": "a67846f0",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "a67846f0",
        "outputId": "12ba73a5-913c-4afb-b812-04cb5f430c68"
      },
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Input:\n",
            "   0 @ 0 : (-9.040182113647461+5.184663772583008j) -> (-9, 5)\n",
            "   1 @ 0 : (-0.7387441396713257-1.1859489679336548j) -> (-1, -1)\n",
            "   2 @ 0 : (14.541327476501465-2.5483381748199463j) -> (15, -3)\n",
            "   3 @ 0 : (10.429906845092773+19.244178771972656j) -> (10, 19)\n",
            "   4 @ 0 : (-1.082539439201355+6.93345832824707j) -> (-1, 7)\n",
            "Output:\n",
            "[(-9, 5), (-1, -1), (15, -3), (10, 19), (-1, 7)]\n"
          ]
        }
      ],
      "source": [
        "import time\n",
        "import numpy\n",
        "from bifrost.packet_writer import HeaderInfo, DiskWriter\n",
        "\n",
        "with open('output.dat', 'wb') as fh:\n",
        "    bfo = DiskWriter('tbn', fh, core=0)\n",
        "    desc = HeaderInfo()\n",
        "    desc.set_tuning(int(round(38e6 / 196e6 * 2**32)))\n",
        "    desc.set_gain(20)\n",
        "    \n",
        "    time_tag = int(time.time()*196e6)\n",
        "    \n",
        "    data = numpy.random.randn(16, 512*10)\n",
        "    data = data + 1j*numpy.random.randn(*data.shape)\n",
        "    for i in range(16):\n",
        "        data[i,:] *= 4\n",
        "    data = bifrost.ndarray(data.astype(numpy.complex64))\n",
        "    \n",
        "    qdata = bifrost.ndarray(shape=data.shape, dtype='ci8')\n",
        "    bifrost.quantize(data, qdata, scale=2)\n",
        "    print('Input:')\n",
        "    for i in range(5):\n",
        "        print('  ', i, '@', 0, ':', data[0,i]*2, '->', qdata[0,i])\n",
        "        \n",
        "    qdata = qdata.reshape(16, -1, 512)\n",
        "    qdata = qdata.transpose(1,0,2).copy()\n",
        "    \n",
        "    bfo.send(desc, time_tag, qdata.shape[0]*1960, 0, 1, qdata)\n",
        "    \n",
        "import struct\n",
        "print('Output:')\n",
        "with open('output.dat', 'rb') as fh:\n",
        "    packet_header = fh.read(24)\n",
        "    packet_payload = fh.read(512*2)\n",
        "    packet_payload = struct.unpack('<1024b', packet_payload)\n",
        "    i, q = packet_payload[0::2], packet_payload[1::2]\n",
        "    print(list(zip(i,q))[:5])\n",
        "    "
      ]
    },
    {
      "cell_type": "markdown",
      "id": "52762b36",
      "metadata": {
        "id": "52762b36"
      },
      "source": [
        "The flow here is:\n",
        " 1. Opening a file in binary write mode and creating new `DiskWriter` and `HeaderInfo` instances.  `DiskWriter` is what actually writes the formatted data to disk and `HeaderInfo` is a metadata helper used to fill in the packet headers as they are written.\n",
        " 2. Setting the \"tuning\" and \"gain\" parameters for the output headers.  These are values that are common to all of the packets written.\n",
        " 3. Creating a time tag for the first sample and a collection of complex data that will go into the packets.\n",
        " 4. Converting the complex data into the 8+8-bit integer format expected for TBN data.  The `DiskWriter` instances are data type-aware.\n",
        " 5. Reshaping `qdata` so that it has axes of (packet number, output index, samples per packet).\n",
        " 6. Actually writing the data to disk with `DiskWriter.send`.  This method takes in:\n",
        "  * a `HeaderInfo` instance used to population the header,\n",
        "  * a starting time tag value,\n",
        "  * a time tag increment to apply when moving to the next packet,\n",
        "  * an output naming starting index,\n",
        "  * a output naming increment to apply when moving to the next output name, and\n",
        "  * the data itself.\n",
        "\n",
        "After this we re-open the file and read in the data to verify that what is written matches what was put in.  Since the data is 8+8-bit complex this is easy to do with some information about the packet structure and the `struct` module.\n",
        "\n",
        "To write to the network rather than a file you would:\n",
        " 1. Swap the open filehandle with a `bifrost.udp_socket.UDPSocket` instance and\n",
        " 2. Trade `bifrost.packet_writer.DiskWriter` for `bifrost.packet_writer.UDPTransmit`."
      ]
    },
    {
      "cell_type": "markdown",
      "id": "f8c48a43",
      "metadata": {
        "id": "f8c48a43"
      },
      "source": [
        "## Adding a New Packet Format\n",
        "\n",
        "Adding a new packet format to Bifrost is a straightforward task:\n",
        " 1. Add a new `.hpp` file to `bifrost/src/formats`.  This file should contain:\n",
        "  * the header format for the packet, \n",
        "  * a sub-class of the C++ `PacketDecoder` class that implements a packet validator,\n",
        "  * a sub-class of the C++ `PacketProcessor` class that implements an unpacket to take the payload for a valid packet and place it in the correct position inside a ring buffer, and\n",
        "  * *optionally*, a sub-class of the C++ `PacketHeaderFiller` class that can be used when creating packets from Bifrost.\n",
        " 2. Add the new format to `packet_capture.hpp`.  This has three parts:\n",
        "  * Add a new method to the C++ class `BFpacketcapture_callback_impl` to handle the sequence change callback.\n",
        "  * Add new sub-class of the C++ `BFpacketcapture_impl` class that implements the format and defines how Bifrost detects changes in the packet sequence.\n",
        "  * Update the C++ function `BFpacketcapture_create` to expose the new packet format.\n",
        " 3. Add the new format callback helper to `packet_capture.cpp`.\n",
        " 4. Update `bifrost/packet_capture.h` to expose the new callback to the Python API.\n",
        " 5. *Optionally*, add support for writing packets:\n",
        "  * Add the new format to `packet_writer.hpp` by adding a new sub-class of the C++ `BFpacketwriter_impl` class.\n",
        "  * Update the C++ `BFpacketwriter_create` function to expose the new packet format."
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.9"
    },
    "colab": {
      "name": "06_data_capture.ipynb",
      "provenance": []
    },
    "accelerator": "GPU",
    "gpuClass": "standard"
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
